package org.codehaus.activemq.service.impl;

import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueList;
import org.codehaus.activemq.service.QueueListEntry;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.store.PersistenceAdapter;
import org.codehaus.activemq.util.Callback;
import org.codehaus.activemq.util.TransactionTemplate;

public class DurableQueueMessageContainer implements QueueMessageContainer {
	private static final Log log = LogFactory.getLog(DurableQueueMessageContainer.class);
	private MessageStore messageStore;
	private String destinationName;
	private QueueList messagesToBeDelivered;
	private QueueList deliveredMessages;
	private PersistenceAdapter persistenceAdapter;
	private TransactionTemplate transactionTemplate;

	public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore,
			String destinationName) {
		this(persistenceAdapter, messageStore, destinationName, new DefaultQueueList(), new DefaultQueueList());
	}

	public DurableQueueMessageContainer(PersistenceAdapter persistenceAdapter, MessageStore messageStore,
			String destinationName, QueueList messagesToBeDelivered, QueueList deliveredMessages) {
		this.persistenceAdapter = persistenceAdapter;
		this.messageStore = messageStore;
		this.destinationName = destinationName;
		this.messagesToBeDelivered = messagesToBeDelivered;
		this.deliveredMessages = deliveredMessages;
		this.transactionTemplate = new TransactionTemplate(persistenceAdapter);
	}

	public String getDestinationName() {
		return this.destinationName;
	}

	public synchronized MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
		MessageIdentity answer = this.messageStore.addMessage(message);
		this.messagesToBeDelivered.add(answer);
		return answer;
	}

	public synchronized void delete(MessageIdentity messageID, MessageAck ack) throws JMSException {
		boolean found = false;
		QueueListEntry entry = this.deliveredMessages.getFirstEntry();
		while (entry != null) {
			MessageIdentity identity = (MessageIdentity) entry.getElement();
			if (messageID.equals(identity)) {
				this.deliveredMessages.remove(entry);

				this.messageStore.removeMessage(identity, ack);
				found = true;
				break;
			}
			entry = this.deliveredMessages.getNextEntry(entry);
		}
		if (!found) {
			entry = this.messagesToBeDelivered.getFirstEntry();
			while (entry != null) {
				MessageIdentity identity = (MessageIdentity) entry.getElement();
				if (messageID.equals(identity)) {
					this.messagesToBeDelivered.remove(entry);

					this.messageStore.removeMessage(identity, ack);
					found = true;
					break;
				}
				entry = this.messagesToBeDelivered.getNextEntry(entry);
			}

			if (!found) {
				log.error("Attempt to acknowledge unknown messageID: " + messageID);
				this.deliveredMessages.remove(messageID);
			}
		}
	}

	public synchronized ActiveMQMessage getMessage(MessageIdentity messageID) throws JMSException {
		return this.messageStore.getMessage(messageID);
	}

	public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
		return getMessage(messageIdentity) != null;
	}

	public void registerMessageInterest(MessageIdentity messageIdentity) {
	}

	public void unregisterMessageInterest(MessageIdentity messageIdentity, MessageAck ack) {
	}

	public synchronized ActiveMQMessage poll() throws JMSException {
		ActiveMQMessage message = null;
		MessageIdentity messageIdentity = (MessageIdentity) this.messagesToBeDelivered.removeFirst();
		if (messageIdentity != null) {
			message = this.messageStore.getMessage(messageIdentity);
			this.deliveredMessages.add(messageIdentity);
		}
		return message;
	}

	public synchronized ActiveMQMessage peekNext(MessageIdentity messageID) throws JMSException {
		ActiveMQMessage answer = null;
		if (messageID == null) {
			MessageIdentity identity = (MessageIdentity) this.messagesToBeDelivered.getFirst();
			if (identity != null)
				answer = this.messageStore.getMessage(identity);
		} else {
			int index = this.messagesToBeDelivered.indexOf(messageID);
			if ((index >= 0) && (index + 1 < this.messagesToBeDelivered.size())) {
				messageID = (MessageIdentity) this.messagesToBeDelivered.get(index + 1);
				if (messageID != null) {
					answer = this.messageStore.getMessage(messageID);
				}
			}
		}
		return answer;
	}

	public synchronized void returnMessage(MessageIdentity messageIdentity) throws JMSException {
		boolean result = this.deliveredMessages.remove(messageIdentity);
		this.messagesToBeDelivered.addFirst(messageIdentity);
	}

	public synchronized void reset() throws JMSException {
		int count = 0;
		MessageIdentity messageIdentity = (MessageIdentity) this.deliveredMessages.removeFirst();
		while (messageIdentity != null) {
			this.messagesToBeDelivered.add(count++, messageIdentity);
			messageIdentity = (MessageIdentity) this.deliveredMessages.removeFirst();
		}
	}

	public synchronized void start() throws JMSException {
		QueueMessageContainer container = this;
		this.transactionTemplate.run(new Callback() {
			private final QueueMessageContainer val$container=container;

			public void execute() throws Throwable {
				DurableQueueMessageContainer.this.messageStore.start();
				DurableQueueMessageContainer.this.messageStore.recover(this.val$container);
			}
		});
	}

	public synchronized void recoverMessageToBeDelivered(MessageIdentity messageIdentity) throws JMSException {
		this.messagesToBeDelivered.add(messageIdentity);
	}

	public void stop() throws JMSException {
		this.messageStore.stop();
	}
}